有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

来自两个独立线程的并行java访问队列

因此,我的目标是测量流媒体引擎的性能。它基本上是一个我可以向其发送数据包的库。衡量这一点的想法是生成数据,将其放入队列,让流引擎获取数据并进行处理

我想这样实现它:数据生成器在一个线程中运行,并在一个无休止的循环中生成数据包,最后有一个Thread.sleep(X)。在进行测试时,想法是最小化tisThread.sleep(X),以查看这是否会对流引擎的性能产生影响。数据生成器将创建的包写入一个队列,即一个ConcurrentLinkedQueue,同时也是一个单例

在另一个线程中,我实例化了流引擎,它通过执行queue.remove()从队列中连续删除包。这是在endlees循环中完成的,没有任何睡眠,因为它应该尽可能快地完成

在第一次尝试实现时,我遇到了一个问题。似乎数据生成器无法按应有的方式将包放入队列。这样做太慢了。我怀疑,流媒体引擎线程的无休止循环正在消耗所有资源,因此减慢了其他一切

我很高兴知道如何处理这个问题或其他设计模式,它们可以优雅地解决这个问题

要求为:2个线程,基本上并行运行。一种是将数据放入队列。另一个正在从队列中读取/删除。我想定期测量队列的大小,以便知道从队列中读取/删除的引擎是否足够快,能够处理生成的包


共 (1) 个答案

  1. # 1 楼答案

    您可以使用BlockingQueue,例如ArrayBlockingQueue,您可以将它们初始化为某个大小,这样队列中的项目数将永远不会超过某个数量,如下例所示:

    // create queue, max size 100
    final ArrayBlockingQueue<String> strings = new ArrayBlockingQueue<>(100);
    final String stop = "STOP";
    
    // start producing
    Runnable producer = new Runnable() {
      @Override
      public void run() {
        try {
          for(int i = 0; i < 1000; i++) {
            strings.put(Integer.toHexString(i));
          }
          strings.put(stop);
        } catch(InterruptedException ignore) {
        }
      }
    };
    
    Thread producerThread = new Thread(producer);
    producerThread.start();
    
    // start monitoring
    Runnable monitor = new Runnable() {
      @Override
      public void run() {
        try {
          while (true){
            System.out.println("Queue size: " + strings.size());
            Thread.sleep(5);
          }
        } catch(InterruptedException ignore) {
        }
      }
    };
    Thread monitorThread = new Thread(monitor);
    monitorThread.start();
    
    // start consuming
    Runnable consumer = new Runnable() {
      @Override
      public void run() {
        // infinite look, will interrupt thread when complete
        try {
          while(true) {
            String value = strings.take();
            if(value.equals(stop)){
              return;
            }
            System.out.println(value);
          }
        } catch(InterruptedException ignore) {
        }
      }
    };
    
    Thread consumerThread = new Thread(consumer);
    consumerThread.start();
    
    // wait for producer to finish
    producerThread.join();
    consumerThread.join();
    
    // interrupt consumer and monitor
    monitorThread.interrupt();
    

    您还可以让第三个线程监视队列的大小,以便了解哪个线程的速度快于另一个线程

    此外,您还可以使用timed put方法和timed或untimed offer方法,这将使您能够更好地控制队列已满或已空时的操作。在上面的示例中,执行将停止,直到有空间容纳下一个元素或者队列中没有其他元素